Flink开发 您所在的位置:网站首页 flink 时间窗口不固定 Flink开发

Flink开发

2024-06-26 11:46| 来源: 网络整理| 查看: 265

Flink开发-事件时间窗口EventTimeWindows 1.Non-Keyed Tumbling Windows2.Keyed Tumbling Windows3.Keyed Session Windows

Event Time指的是数据流中每个元素或者每个事件自带的时间属性,一般是事件发生的时间。由于事件从发生到进入Flink时间算子之间有很多环节,一个较早发生的事件因为延迟可能较晚到达,因此使用Event Time意味着事件到达有可能是乱序的。

使用Event Time时,最理想的情况下,我们可以一直等待所有的事件到达后再进行时间窗口的处理。假设一个时间窗口内的所有数据都已经到达,基于Event Time的流处理会得到正确且一致的结果。无论我们是将同一个程序部署在不同的计算环境,还是在相同的环境下多次计算同一份数据,都能够得到同样的计算结果。我们根本不同担心乱序到达的问题。

但这只是理想情况,现实中无法实现,因为我们既不知道究竟要等多长时间才能确认所有事件都已经到达,更不可能无限地一直等待下去。在实际应用中,当涉及到对事件按照时间窗口进行统计时,Flink会将窗口内的事件缓存下来,直到接收到一个Watermark,Watermark假设不会有更晚数据的到达。Watermark意味着在一个时间窗口下,Flink会等待一个有限的时间,这在一定程度上降低了计算结果的绝对准确性,而且增加了系统的延迟。比起其他几种时间语义,使用Event Time的好处是某个事件的时间是确定的,这样能够保证计算结果在一定程度上的可预测性。

一个基于Event Time的Flink程序中必须定义:

每条数据的Event Time时间戳作为Event Tme,如何生成Watermark。我们可以使用数据自带的时间作为Event Time,也可以在数据到达Flink后人为给Event Time赋值。

总之,使用Event Time的优势是结果的可预测性,缺点是缓存较大,增加了延迟,且调试和定位问题更复杂。

1.Non-Keyed Tumbling Windows public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //在老版本API中我们使用eventTime作为时间标准时,设置EventTime作为时间标准 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //5S窗口时间范围为[1609430400000,1609430405000) //当前分区中数据的携带最大EventTime - 乱序延迟时间 >= 窗口结束时间 就会触发该窗口 DataStreamSource socketStream = env.socketTextStream("localhost", 8888); //提取数据中的时间,将时间转成精确到毫秒的Long类型,并生成WaterMark,Watermark可以理解为允许数据的延迟时间。调用该方法前后不会对数据格式产生影响。 SingleOutputStreamOperator dataWithWaterMark = socketStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(Time.seconds(0)) { @Override//element就是我们传入的数据 public long extractTimestamp(String element) { //提取数据中的时间 return Long.parseLong(element.split(",")[0]); } }); SingleOutputStreamOperator nums = dataWithWaterMark.map(new MapFunction() { @Override public Integer map(String s) throws Exception { return Integer.parseInt(s.split(",")[1]); } }); SingleOutputStreamOperator sum = nums.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))).sum(0); sum.print(); env.execute(""); }

输入内容:

C:\Users\zhibai>nc -lp 8888 1609430400000,1 1609430403000,2 1609430404400,1 1609430404999,3

输出结果:

2> 7 2.Keyed Tumbling Windows

当使用单并行度source作为数据源时,可以当从该数据源接收到的EventTime满足窗口触发条件时,会将下游的所有并行度窗口都触发。当使用多并行度Source作为数据源是,必须保证从Source的所有并行度都接收到了满足条件的EventTime窗口才会触发。

public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource socketStream = env.socketTextStream("localhost", 8888); SingleOutputStreamOperator map = socketStream.map(new MapFunction() { @Override public Tuple3 map(String s) throws Exception { String[] split = s.split(","); return Tuple3.of(Long.parseLong(split[0]), split[1], Integer.parseInt(split[2])); } }).setParallelism(2); SingleOutputStreamOperator timestampsAndWatermarks = map.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(Time.seconds(0)) { @Override public long extractTimestamp(Tuple3 element) { return element.f0; } }); KeyedStream keyedStream = timestampsAndWatermarks.project(1, 2).keyBy(0); SingleOutputStreamOperator sum = keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(5))).sum(1); sum.print(); env.execute(""); }

输入内容:

C:\Users\zhibai>nc -lp 8888 1000,saprk,1 1000,flink,1 1000,flink,1 3300,saprk,1 4444,hadoop,1 4999,flink,1 4999,hadoop,1

输出结果:

2> (saprk,2) 8> (hadoop,2) 7> (flink,3) 3.Keyed Session Windows

SessionWindos触发也会依据数据的EventTime来触发,程序会依据数据流入的最新的时间来当作当前时间,以此为依据来触发以满足的时间间隔的数据分区。其他与上文提到的TumblingWindows相同。

public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource socketStream = env.socketTextStream("localhost", 8888); //调用该方法前后不会对数据格式产生影响 SingleOutputStreamOperator dataWithWaterMark = socketStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(Time.seconds(0)) { @Override//element就是我们传入的数据 public long extractTimestamp(String element) { //提取数据中的时间 return Long.parseLong(element.split(",")[0]); } }); SingleOutputStreamOperator wordAndOne = dataWithWaterMark.map(new MapFunction() { @Override public Tuple2 map(String s) throws Exception { String[] fields = s.split(","); return Tuple2.of(fields[0], Integer.parseInt(fields[1])); } }); KeyedStream keyedStream = wordAndOne.keyBy(new KeySelector() { @Override public String getKey(Tuple2 s) throws Exception { return s.f0; } }); //EventTime WindowedStream eventTimewindow = keyedStream.window(EventTimeSessionWindows.withGap(Time.seconds(5))); eventTimewindow.sum(1).print(); env.execute(""); }

输入内容:

C:\Users\zhibai>nc -lp 8888 1000,1 1000,3 2345,1 2345,6 3000,1 8000,1

输出结果:

6> (1000,4) 6> (2345,7) 6> (3000,1)


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有